/*
 * Copyright 1999-2020 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package config_client

import (
	"fmt"
	"math"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/pkg/errors"

	"github.com/nacos-group/nacos-sdk-go/v2/common/monitor"

	"github.com/aliyun/alibaba-cloud-sdk-go/services/kms"
	"github.com/nacos-group/nacos-sdk-go/v2/clients/cache"
	"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
	"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
	"github.com/nacos-group/nacos-sdk-go/v2/common/nacos_error"
	"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_request"
	"github.com/nacos-group/nacos-sdk-go/v2/common/remote/rpc/rpc_response"
	"github.com/nacos-group/nacos-sdk-go/v2/inner/uuid"
	"github.com/nacos-group/nacos-sdk-go/v2/model"
	"github.com/nacos-group/nacos-sdk-go/v2/util"
	"github.com/nacos-group/nacos-sdk-go/v2/vo"
)

const (
	perTaskConfigSize = 3000
	executorErrDelay  = 5 * time.Second
)

type ConfigClient struct {
	nacos_client.INacosClient
	kmsClient       *kms.Client
	localConfigs    []vo.ConfigParam
	mutex           sync.Mutex
	configProxy     IConfigProxy
	configCacheDir  string
	lastAllSyncTime time.Time
	cacheMap        cache.ConcurrentMap
	uid             string
	listenExecute   chan struct{}
}

type cacheData struct {
	isInitializing    bool
	dataId            string
	group             string
	content           string
	contentType       string
	tenant            string
	cacheDataListener *cacheDataListener
	md5               string
	appName           string
	taskId            int
	configClient      *ConfigClient
	isSyncWithServer  bool
}

type cacheDataListener struct {
	listener vo.Listener
	lastMd5  string
}

func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) {
	config := &ConfigClient{}
	config.INacosClient = nc
	clientConfig, err := nc.GetClientConfig()
	if err != nil {
		return nil, err
	}
	serverConfig, err := nc.GetServerConfig()
	if err != nil {
		return nil, err
	}
	httpAgent, err := nc.GetHttpAgent()
	if err != nil {
		return nil, err
	}

	if err = initLogger(clientConfig); err != nil {
		return nil, err
	}
	clientConfig.CacheDir = clientConfig.CacheDir + string(os.PathSeparator) + "config"
	config.configCacheDir = clientConfig.CacheDir

	if config.configProxy, err = NewConfigProxy(serverConfig, clientConfig, httpAgent); err != nil {
		return nil, err
	}

	if clientConfig.OpenKMS {
		kmsClient, err := kms.NewClientWithAccessKey(clientConfig.RegionId, clientConfig.AccessKey, clientConfig.SecretKey)
		if err != nil {
			return nil, err
		}
		config.kmsClient = kmsClient
	}

	uid, err := uuid.NewV4()
	if err != nil {
		return nil, err
	}
	config.uid = uid.String()

	config.cacheMap = cache.NewConcurrentMap()
	// maximum buffered queue to prevent chan deadlocks during frequent configuration file updates
	config.listenExecute = make(chan struct{}, math.MaxInt64)

	config.startInternal()

	return config, err
}

func initLogger(clientConfig constant.ClientConfig) error {
	return logger.InitLogger(logger.BuildLoggerConfig(clientConfig))
}

func (client *ConfigClient) GetConfig(param vo.ConfigParam) (content string, err error) {
	content, err = client.getConfigInner(param)

	if err != nil {
		return "", err
	}

	return client.decrypt(param.DataId, content)
}

func (client *ConfigClient) decrypt(dataId, content string) (string, error) {
	if client.kmsClient != nil && strings.HasPrefix(dataId, "cipher-") {
		request := kms.CreateDecryptRequest()
		request.Method = "POST"
		request.Scheme = "https"
		request.AcceptFormat = "json"
		request.CiphertextBlob = content
		response, err := client.kmsClient.Decrypt(request)
		if err != nil {
			return "", fmt.Errorf("kms decrypt failed: %v", err)
		}
		content = response.Plaintext
	}
	return content, nil
}

func (client *ConfigClient) encrypt(dataId, content string) (string, error) {
	if client.kmsClient != nil && strings.HasPrefix(dataId, "cipher-") {
		request := kms.CreateEncryptRequest()
		request.Method = "POST"
		request.Scheme = "https"
		request.AcceptFormat = "json"
		request.KeyId = "alias/acs/mse" // use default key
		request.Plaintext = content
		response, err := client.kmsClient.Encrypt(request)
		if err != nil {
			return "", fmt.Errorf("kms encrypt failed: %v", err)
		}
		content = response.CiphertextBlob
	}
	return content, nil
}

func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string, err error) {
	if len(param.DataId) <= 0 {
		err = errors.New("[client.GetConfig] param.dataId can not be empty")
		return "", err
	}
	if len(param.Group) <= 0 {
		param.Group = constant.DEFAULT_GROUP
	}

	//todo 获取容灾配置的 EncryptedDataKey LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover
	clientConfig, _ := client.GetClientConfig()
	cacheKey := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)
	content = cache.GetFailover(cacheKey, client.configCacheDir)
	if len(content) > 0 {
		logger.GetLogger().Warn(fmt.Sprintf("%s %s %s is using failover content!", clientConfig.NamespaceId, param.Group, param.DataId))
		return content, nil
	}
	response, err := client.configProxy.queryConfig(param.DataId, param.Group, clientConfig.NamespaceId,
		clientConfig.TimeoutMs, false, client)
	if err != nil {
		logger.Infof("get config from server error:%+v ", err)
		content, err = cache.ReadConfigFromFile(cacheKey, client.configCacheDir)
		if err != nil {
			logger.Errorf("get config from cache  error:%+v ", err)
			return "", errors.New("read config from both server and cache fail")
		}
		return content, nil
	}
	return response.Content, nil
}

func (client *ConfigClient) PublishConfig(param vo.ConfigParam) (published bool, err error) {
	if len(param.DataId) <= 0 {
		err = errors.New("[client.PublishConfig] param.dataId can not be empty")
		return
	}
	if len(param.Content) <= 0 {
		err = errors.New("[client.PublishConfig] param.content can not be empty")
		return
	}

	if len(param.Group) <= 0 {
		param.Group = constant.DEFAULT_GROUP
	}
	if param.Content, err = client.encrypt(param.DataId, param.Content); err != nil {
		return
	}

	clientConfig, _ := client.GetClientConfig()
	request := rpc_request.NewConfigPublishRequest(param.Group, param.DataId, clientConfig.NamespaceId, param.Content, param.CasMd5)
	request.AdditionMap["tag"] = param.Tag
	request.AdditionMap["appName"] = param.AppName
	request.AdditionMap["betaIps"] = param.BetaIps
	request.AdditionMap["type"] = param.Type
	request.AdditionMap["encryptedDataKey"] = param.EncryptedDataKey
	rpcClient := client.configProxy.getRpcClient(client)
	response, err := client.configProxy.requestProxy(rpcClient, request, constant.DEFAULT_TIMEOUT_MILLS)
	if response != nil {
		return response.IsSuccess(), err
	}
	return false, err
}

func (client *ConfigClient) DeleteConfig(param vo.ConfigParam) (deleted bool, err error) {
	if len(param.DataId) <= 0 {
		err = errors.New("[client.DeleteConfig] param.dataId can not be empty")
	}
	if len(param.Group) <= 0 {
		param.Group = constant.DEFAULT_GROUP
	}
	if err != nil {
		return false, err
	}
	clientConfig, _ := client.GetClientConfig()
	request := rpc_request.NewConfigRemoveRequest(param.Group, param.DataId, clientConfig.NamespaceId)
	rpcClient := client.configProxy.getRpcClient(client)
	response, err := client.configProxy.requestProxy(rpcClient, request, constant.DEFAULT_TIMEOUT_MILLS)
	if response != nil {
		return response.IsSuccess(), err
	}
	return false, err
}

//Cancel Listen Config
func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) {
	clientConfig, err := client.GetClientConfig()
	if err != nil {
		logger.Errorf("[checkConfigInfo.GetClientConfig] failed,err:%+v", err)
		return
	}
	client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId))
	logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group)
	return err
}

func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
	if len(param.DataId) <= 0 {
		err = errors.New("[client.ListenConfig] DataId can not be empty")
		return err
	}
	if len(param.Group) <= 0 {
		err = errors.New("[client.ListenConfig] Group can not be empty")
		return err
	}
	clientConfig, err := client.GetClientConfig()
	if err != nil {
		err = errors.New("[checkConfigInfo.GetClientConfig] failed")
		return err
	}

	key := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)
	var cData *cacheData
	if v, ok := client.cacheMap.Get(key); ok {
		cData = v.(*cacheData)
		cData.isInitializing = true
	} else {
		var (
			content string
			md5Str  string
		)
		content, _ = cache.ReadConfigFromFile(key, client.configCacheDir)
		if len(content) > 0 {
			md5Str = util.Md5(content)
		}
		listener := &cacheDataListener{
			listener: param.OnChange,
			lastMd5:  md5Str,
		}

		cData = &cacheData{
			isInitializing:    true,
			dataId:            param.DataId,
			group:             param.Group,
			tenant:            clientConfig.NamespaceId,
			content:           content,
			md5:               md5Str,
			cacheDataListener: listener,
			taskId:            client.cacheMap.Count() / perTaskConfigSize,
			configClient:      client,
		}
	}
	client.cacheMap.Set(key, cData)
	return
}

func (client *ConfigClient) SearchConfig(param vo.SearchConfigParm) (*model.ConfigPage, error) {
	return client.searchConfigInner(param)
}

func (client *ConfigClient) CloseClient() {
	client.configProxy.getRpcClient(client).Shutdown()
}

func (client *ConfigClient) searchConfigInner(param vo.SearchConfigParm) (*model.ConfigPage, error) {
	if param.Search != "accurate" && param.Search != "blur" {
		return nil, errors.New("[client.searchConfigInner] param.search must be accurate or blur")
	}
	if param.PageNo <= 0 {
		param.PageNo = 1
	}
	if param.PageSize <= 0 {
		param.PageSize = 10
	}
	clientConfig, _ := client.GetClientConfig()
	configItems, err := client.configProxy.searchConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)
	if err != nil {
		logger.Errorf("search config from server error:%+v ", err)
		if _, ok := err.(*nacos_error.NacosError); ok {
			nacosErr := err.(*nacos_error.NacosError)
			if nacosErr.ErrorCode() == "404" {
				return nil, errors.New("config not found")
			}
			if nacosErr.ErrorCode() == "403" {
				return nil, errors.New("get config forbidden")
			}
		}
		return nil, err
	}
	return configItems, nil
}

func (client *ConfigClient) startInternal() {
	timer := time.NewTimer(executorErrDelay)
	go func() {
		for {
			select {
			case <-client.listenExecute:
				client.executeConfigListen()
			case <-timer.C:
				client.executeConfigListen()
			}
			timer.Reset(executorErrDelay)
		}
	}()
}

func (client *ConfigClient) executeConfigListen() {
	listenCachesMap := make(map[int][]*cacheData, 16)
	needAllSync := time.Since(client.lastAllSyncTime) >= constant.ALL_SYNC_INTERNAL
	for _, v := range client.cacheMap.Items() {
		cache, ok := v.(*cacheData)
		if !ok {
			continue
		}

		if cache.isSyncWithServer {
			if cache.md5 != cache.cacheDataListener.lastMd5 {
				go cache.cacheDataListener.listener(cache.tenant, cache.group, cache.dataId, cache.content)
				cache.cacheDataListener.lastMd5 = cache.md5
			}
			if !needAllSync {
				continue
			}
		}
		var cacheDatas []*cacheData
		if cacheDatas, ok = listenCachesMap[cache.taskId]; ok {
			cacheDatas = append(cacheDatas, cache)
		} else {
			cacheDatas = append(cacheDatas, cache)
		}
		listenCachesMap[cache.taskId] = cacheDatas
	}
	hasChangedKeys := false
	if len(listenCachesMap) > 0 {
		for taskId, listenCaches := range listenCachesMap {
			request := buildConfigBatchListenRequest(listenCaches)
			rpcClient := client.configProxy.createRpcClient(fmt.Sprintf("%d", taskId), client)
			iResponse, err := client.configProxy.requestProxy(rpcClient, request, 3000)
			if err != nil {
				logger.Warnf("ConfigBatchListenRequest failure,err:%+v", err)
				continue
			}
			if iResponse == nil && !iResponse.IsSuccess() {
				continue
			}
			changeKeys := make(map[string]struct{})
			if response, ok := iResponse.(*rpc_response.ConfigChangeBatchListenResponse); ok {
				if len(response.ChangedConfigs) > 0 {
					hasChangedKeys = true
					for _, v := range response.ChangedConfigs {
						changeKey := util.GetConfigCacheKey(v.DataId, v.Group, v.Tenant)
						changeKeys[changeKey] = struct{}{}
						if cache, ok := client.cacheMap.Get(changeKey); !ok {
							continue
						} else {
							cacheData := cache.(*cacheData)
							client.refreshContentAndCheck(cacheData, !cacheData.isInitializing)
						}
					}
				}

				for _, v := range listenCaches {
					changeKey := util.GetConfigCacheKey(v.dataId, v.group, v.tenant)
					if _, ok := changeKeys[changeKey]; !ok {
						v.isSyncWithServer = true
						continue
					}
					v.isInitializing = true
				}
			}
		}
	}
	if needAllSync {
		client.lastAllSyncTime = time.Now()
	}

	if hasChangedKeys {
		client.notifyListenConfig()
	}
	monitor.GetListenConfigCountMonitor().Set(float64(client.cacheMap.Count()))
}

func buildConfigBatchListenRequest(caches []*cacheData) *rpc_request.ConfigBatchListenRequest {
	request := rpc_request.NewConfigBatchListenRequest(len(caches))
	for _, cache := range caches {
		request.ConfigListenContexts = append(request.ConfigListenContexts,
			model.ConfigListenContext{Group: cache.group, Md5: cache.md5, DataId: cache.dataId, Tenant: cache.tenant})
	}
	return request
}

func (client *ConfigClient) refreshContentAndCheck(cacheData *cacheData, notify bool) {
	configQueryResponse, err := client.configProxy.queryConfig(cacheData.dataId, cacheData.group, cacheData.tenant,
		constant.DEFAULT_TIMEOUT_MILLS, notify, client)
	if err != nil {
		logger.Errorf("refresh content and check md5 fail ,dataId=%s,group=%s,tenant=%s ", cacheData.dataId,
			cacheData.group, cacheData.tenant)
		return
	}
	cacheData.content = configQueryResponse.Content
	cacheData.contentType = configQueryResponse.ContentType
	if notify {
		logger.Infof("[config_rpc_client] [data-received] dataId=%s, group=%s, tenant=%s, md5=%s, content=%s, type=%s",
			cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.md5,
			util.TruncateContent(cacheData.content), cacheData.contentType)
	}
	cacheData.md5 = util.Md5(cacheData.content)
	if cacheData.md5 != cacheData.cacheDataListener.lastMd5 {
		go cacheData.cacheDataListener.listener(cacheData.tenant, cacheData.group, cacheData.dataId, cacheData.content)
		cacheData.cacheDataListener.lastMd5 = cacheData.md5
		client.cacheMap.Set(util.GetConfigCacheKey(cacheData.dataId, cacheData.group, cacheData.tenant), cacheData)
	}
}

func (client *ConfigClient) notifyListenConfig() {
	client.listenExecute <- struct{}{}
}
